{
 "metadata": {
  "name": ""
 },
 "nbformat": 3,
 "nbformat_minor": 0,
 "worksheets": [
  {
   "cells": [
    {
     "cell_type": "heading",
     "level": 1,
     "metadata": {},
     "source": [
      "Chunking the MNIST8M dataset and store the chunks in the cloud"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "This notebook is an example to demonstrate how to preprocess a large dataset in the svmlight format to convert into chunked, dense numpy arrays that are them compressed individually and stored in a cloud object store on Amazon S3 or Azure Blob Store for later consumption by machine learning models."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "import re\n",
      "import bz2\n",
      "import os\n",
      "from os.path import expanduser, join, exists\n",
      "from configparser import ConfigParser\n",
      "from time import time\n",
      "\n",
      "import numpy as np\n",
      "from concurrent.futures import ThreadPoolExecutor\n",
      "\n",
      "from libcloud.storage.types import Provider\n",
      "from libcloud.storage.types import ContainerDoesNotExistError\n",
      "from libcloud.storage.types import ObjectDoesNotExistError\n",
      "from libcloud.storage.providers import get_driver\n",
      "\n",
      "\n",
      "DATA_FOLDER = expanduser('~/data/mnist8m')\n",
      "SVMLIGHT_DATA_FOLDER = join(DATA_FOLDER, 'svmlight')\n",
      "NUMPY_DATA_FOLDER = join(DATA_FOLDER, 'numpy')\n",
      "\n",
      "MNIST8M_SRC_URL = ('http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/'\n",
      "                   'datasets/multiclass/mnist8m.bz2')\n",
      "MNIST8M_SRC_FILENAME = MNIST8M_SRC_URL.rsplit('/', 1)[1]\n",
      "MNIST8M_SRC_FILEPATH = join(DATA_FOLDER, MNIST8M_SRC_FILENAME)\n",
      "\n",
      "\n",
      "CHUNK_FILENAME_PREFIX = \"mnist8m-chunk-\"\n",
      "\n",
      "CHUNK_SIZE = 100000"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 15
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Decompressing and chunking the source dataset"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Download the `mnist8m.bz2` source file into the data folder if not previously downloaded:"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "if not exists(DATA_FOLDER):\n",
      "    os.makedirs(DATA_FOLDER)\n",
      "\n",
      "if not exists(MNIST8M_SRC_FILEPATH):\n",
      "    cmd = \"(cd '%s' && wget -c '%s')\" % (DATA_FOLDER, MNIST8M_SRC_URL)\n",
      "    print(cmd)\n",
      "    os.system(cmd)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 16
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Decompress the big bz2 source file and chunk the source svmlight formatted data file to make it easier to process it in parallel:"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "if not exists(SVMLIGHT_DATA_FOLDER):\n",
      "    os.makedirs(SVMLIGHT_DATA_FOLDER)\n",
      "\n",
      "chunk_filenames = [fn for fn in os.listdir(SVMLIGHT_DATA_FOLDER)\n",
      "                   if (fn.startswith(CHUNK_FILENAME_PREFIX)\n",
      "                       and fn.endswith('.svmlight'))]\n",
      "chunk_filenames.sort()\n",
      "\n",
      "\n",
      "def get_svmlight_filename(chunk_idx):\n",
      "    chunk_filename = \"%s%03d.svmlight\" % (CHUNK_FILENAME_PREFIX, chunk_idx)\n",
      "    return join(SVMLIGHT_DATA_FOLDER, chunk_filename)\n",
      "\n",
      "\n",
      "if not chunk_filenames:\n",
      "    chunk_filenames = []\n",
      "    with bz2.BZ2File(MNIST8M_SRC_FILEPATH) as source:\n",
      "        target, line_no, chunk_idx = None, 0, 0\n",
      "        for line in source:\n",
      "            line_no += 1\n",
      "            if target is None:\n",
      "                chunk_filename = get_svmlight_filename(chunk_idx)\n",
      "                target = open(chunk_filename, 'wb')\n",
      "                chunk_idx += 1\n",
      "                chunk_filenames.append(chunk_filename)\n",
      "                \n",
      "            target.write(line)\n",
      "                \n",
      "            if line_no >= CHUNK_SIZE:\n",
      "                target.close()\n",
      "                target, line_no = None, 0\n",
      "        if target is not None:\n",
      "            target.close()"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 22
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Parsing the svmlight format in parallel and compressing the resulting chunks locally"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Parse the svmlight formatted chunks into dense numpy arrays and store the resulting chunks as compressed binary files using NumPy own format."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "from IPython.parallel import Client\n",
      "client = Client()\n",
      "lb_view = client.load_balanced_view()\n",
      "len(lb_view)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 23,
       "text": [
        "4"
       ]
      }
     ],
     "prompt_number": 23
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "def parse_svmlight_chunk(input_chunk_filename, output_chunk_filename,\n",
      "                         output_chunk_labels_filename,\n",
      "                         n_features, chunk_size=CHUNK_SIZE):\n",
      "    # Import dependencies lazily to be able to run this function\n",
      "    # on remote nodes of the cluster in parallel with IPython\n",
      "    from sklearn.datasets import load_svmlight_file\n",
      "\n",
      "    if (not exists(output_chunk_filename)\n",
      "        or not exists(output_chunk_labels_filename)):\n",
      "        X, y = load_svmlight_file(input_chunk_filename, n_features=n_features)\n",
      "        np.savez_compressed(output_chunk_filename, X.toarray() / 255.)\n",
      "        np.savez_compressed(output_chunk_labels_filename, y)\n",
      "\n",
      "\n",
      "def get_numpy_filenames(i):\n",
      "    data = \"%s%03d_data.npz\" % (CHUNK_FILENAME_PREFIX, chunk_idx)\n",
      "    labels = \"%s%03d_labels.npz\" % (CHUNK_FILENAME_PREFIX, chunk_idx)\n",
      "    return (\n",
      "        join(NUMPY_DATA_FOLDER, data),\n",
      "        join(NUMPY_DATA_FOLDER, labels),\n",
      "    )\n",
      "\n",
      "    \n",
      "tasks = []\n",
      "n_features = 28 ** 2 # hardcoded for now\n",
      "\n",
      "for i in range(81): # 8100000 lines // 100000 lines per chunk:\n",
      "    svmlight_chunk_name = get_svmlight_filename(i)\n",
      "    data_chunk_name, label_chunk_name = get_numpy_filenames(i)\n",
      "    tasks.append(lb_view.apply(parse_svmlight_chunk,\n",
      "                               svmlight_chunk_name,\n",
      "                               data_chunk_name,\n",
      "                               label_chunk_name,\n",
      "                               n_features))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 24
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "sum(t.ready() for t in tasks), len(tasks)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 30,
       "text": [
        "(0, 81)"
       ]
      }
     ],
     "prompt_number": 30
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Uploading the results to a cloud store"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "CONFIGFILE_PATH = 'cloudstorage.ini'"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 112
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Let's use [Apache Libcloud](http://libcloud.apache.org) to upload the chunk objects to a permanent store for later usage in ephemeral VMs. We will store the credential in a configuration file named `cloudstorage.ini`. Here is the expected content for the Windows Azure Cloud:\n",
      "\n",
      "```\n",
      "[account]\n",
      "libcloud_provider = azure_blobs\n",
      "account_name = myacount\n",
      "account_secret = primarykey\n",
      "```\n",
      "\n",
      "On Amazon S3, the config file would look like:\n",
      "\n",
      "```\n",
      "[account]\n",
      "libcloud_provider = s3\n",
      "account_name = aws_key_id\n",
      "account_secret = aws_secret_key\n",
      "```\n",
      "\n",
      "Apache Libcloud supports many more [Cloud Object Store providers](https://ci.apache.org/projects/libcloud/docs/storage/supported_providers.html).\n",
      "\n",
      "The objects will be stored in a specific container. On some providers, the container name must be globally unique (such as is the case for bucket names on S3). On others like Azure, the container names are local to the cloud storage account. In case of conflict, just change the container name: "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "CONTAINER_NAME = \"mnist8m\""
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 110
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The following function parse the `cloudstorage.ini` file and build a Libcloud driver instance. This instance is not thread safe, hence we wrap the driver instanciation in a function to be reused in individual threads."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "def build_driver(configfile_path=CONFIGFILE_PATH, section='account'):\n",
      "    config = ConfigParser()\n",
      "    config.read(configfile_path)\n",
      "    provider_name = config.get(section, 'libcloud_provider')\n",
      "    driver_type = get_driver(provider_name)\n",
      "    account_name = config.get(section, 'account_name')\n",
      "    account_secret = config.get(section, 'account_secret')\n",
      "    return driver_type(account_name, account_secret)\n",
      "\n",
      "driver = build_driver()"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 103
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The following utility function checks that a container with a specific name exits on the Cloud Storage provider, otherwise it creates it:"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "def get_or_create_container(driver, container_name=CONTAINER_NAME):\n",
      "    try:\n",
      "        return driver.get_container(container_name)\n",
      "    except ContainerDoesNotExistError:\n",
      "        return driver.create_container(container_name)\n",
      "    \n",
      "container = get_or_create_container(driver)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 104
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We can now write a function that uploads invidual local files to a target object container. As this function will be called in parallel in various threads we instanciate a dedicated driver inside."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "def upload_object(local_folder, object_name, container_name=CONTAINER_NAME, skip_if_exists=True):\n",
      "    driver = build_driver()  # libcloud drivers are not thread-safe\n",
      "    container = get_or_create_container(driver, container_name)\n",
      "    filepath = os.path.join(local_folder, object_name)\n",
      "    if skip_if_exists:\n",
      "        try:\n",
      "            # Check the size to deal with partially uploaded files\n",
      "            ob =  container.get_object(object_name)\n",
      "            if ob.size == os.stat(filepath).st_size:\n",
      "                return ob\n",
      "        except ObjectDoesNotExistError:\n",
      "            pass\n",
      "    return container.upload_object(filepath, object_name,\n",
      "        extra={'content_type': 'application/octet-stream'})"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 105
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Finally let us upload all the chunks and labels from the MNIST8M dataset in parallel to speedup the upload. As IPython does not seem to be fully compatible with gevent monkeypatching we will use Python threads to upload data in parallel: "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "n_workers = 10\n",
      "filenames = os.listdir(NUMPY_DATA_FOLDER)\n",
      "\n",
      "tic = time()\n",
      "with ThreadPoolExecutor(max_workers=n_workers) as e:\n",
      "    for f in filenames:\n",
      "        e.submit(upload_object, local_folder, f)\n",
      "print(\"Uploaded {} files with {} workers in {:0.3f}s\".format(\n",
      "      len(filenames), n_workers, time() - tic))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "Uploaded 83 files with 10 workers in 281.750s\n"
       ]
      }
     ],
     "prompt_number": 106
    }
   ],
   "metadata": {}
  }
 ]
}